package com.xiaomaoguai.fcp.pre.kepler.router.async;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.WorkHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基于disruptor实现的定长线程池 该线程池线程数大小为workThreadNum+1 一级缓存disruptor ringbuffer
 * 二级缓存ConcurrentLinkedQueue
 *
 * @param <O>
 * @param <O>
 * @author DH
 */
public class DisruptorWorkPool<O> implements WorkPool<O> {

	private final static String DEFAULT_POOL_NAME = "Processor";

	/**
	 * 队列大小警告
	 */
	private final static int QUEUE_SIZE_WARING = 5000;

	private final static AtomicInteger POOL_NUMBER = new AtomicInteger(1);

	private final static String THREAD_NAME = "Router-DisruptorWorkPool-%d-%s-%d";

	private final Logger log = LoggerFactory.getLogger(this.getClass());

	private final AtomicInteger threadNumber = new AtomicInteger(1);

	private final AtomicBoolean started = new AtomicBoolean(false);

	// 多线程共享的workSequence
	private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

	// 线程池一级缓存队列 基于disruptor的ringbuffer
	private final RingBuffer<HandlerEvent<O>> ringBuffer;

	// 线程池二级缓存队列 ConcurrentLinkedQueue
	private final ConcurrentLinkedQueue<HandlerEvent<O>> concurrentQueue = new ConcurrentLinkedQueue<>();

	// ringbuffer大小
	private final Integer bufferSize;

	// ConcurrentLinkedQueue大小
	private final AtomicInteger concurrentQueueSize = new AtomicInteger(0);

	// ConcurrentLinkedQueue最大长度
	private final Integer maxQueueSize;

	// 线程池中的线程
	private final Processor<HandlerEvent<O>>[] processors;

	// 线程数
	private final Integer workThreadNum;

	/**
	 * 线程池名称
	 */
	private String asyncWorkPoolName;

	private final Object lock = new Object();

	// 消息满载handler
	private RejectHandler<O> rejectHandler = new RejectHandler<>();

	// 实际消息处理类
	private WorkHandler<HandlerEvent<O>> workHandler;

	public DisruptorWorkPool(final WorkHandler<HandlerEvent<O>> workHandler,
							 final EventFactory<HandlerEvent<O>> eventFactory) {
		this(DEFAULT_POOL_NAME, 512, Runtime.getRuntime().availableProcessors() + 1, null, workHandler, null, eventFactory);
	}

	public DisruptorWorkPool(String asyncWorkPoolName, Integer bufferSize, Integer workThreadNum, Integer maxQueueSize,
							 final WorkHandler<HandlerEvent<O>> workHandler, RejectHandler<O> rejectHandler,
							 final EventFactory<HandlerEvent<O>> eventFactory) {
		this.asyncWorkPoolName = asyncWorkPoolName;
		this.bufferSize = (bufferSize == null ? 512 : bufferSize);
		this.workThreadNum = (workThreadNum == null ? Runtime.getRuntime().availableProcessors() + 1 : workThreadNum);
		this.ringBuffer = RingBuffer.createMultiProducer(eventFactory, this.bufferSize, new BlockingWaitStrategy());
		this.maxQueueSize = (maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize);
		this.rejectHandler = (rejectHandler == null ? new RejectHandler<>() : rejectHandler);
		this.workHandler = workHandler;
		final SequenceBarrier barrier = ringBuffer.newBarrier();
		processors = new Processor[this.workThreadNum];
		Sequence[] sequences = new Sequence[this.workThreadNum + 1];
		for (int i = 0; i < this.workThreadNum; i++) {
			processors[i] = new Processor(ringBuffer, concurrentQueue, barrier, workHandler, concurrentQueueSize,
					workSequence);
			sequences[i] = processors[i].getSequence();
		}
		sequences[this.workThreadNum] = workSequence;
		ringBuffer.addGatingSequences(sequences);

	}

	public DisruptorWorkPool(final Integer workThreadNum, final WorkHandler<HandlerEvent<O>> workHandler,
							 final EventFactory<HandlerEvent<O>> eventFactory) {
		this(DEFAULT_POOL_NAME, 512, workThreadNum, null, workHandler, null, eventFactory);
	}

	/**
	 * 启动线程池
	 *
	 * @return
	 */
	@Override
	public void start() {
		if (!started.compareAndSet(false, true)) {
			throw new IllegalStateException(
					"WorkerPool has already been started and cannot be restarted until halted.");
		}
		Integer poolNumber = POOL_NUMBER.getAndIncrement();
		final long cursor = ringBuffer.getCursor();
		workSequence.set(cursor);
		for (Processor<HandlerEvent<O>> processor : processors) {
			processor.setName(String.format(THREAD_NAME, poolNumber, asyncWorkPoolName, threadNumber.getAndIncrement()));
			processor.start();
		}
		// 创建补偿线程
		String compensateThread = String.format(THREAD_NAME, poolNumber, "daemon", 1);
		createCompensateThread(compensateThread);
	}

	/**
	 * 创建补偿线程 消费二级缓存concurrentQueue中的线程
	 *
	 * @param name
	 */
	private void createCompensateThread(String name) {
		Thread t = new Thread(new Runnable() {

			@Override
			public void run() {
				while (true) {
					try {
						synchronized (lock) {
							lock.wait(2000L);
						}
						HandlerEvent<O> event = concurrentQueue.poll();
						if (event != null)
							workHandler.onEvent(event);
					} catch (Exception e) {
						log.error("compensateThread handler error", e);
					}
				}

			}
		});
		t.setDaemon(true);
		t.setName(name);
		t.start();
	}

	/**
	 * ringbuffer 虽然号称无界队列，但本质是一个数组（有界），只是消息在数组上可以反复覆盖。
	 * 当消息没有被覆盖的情况下，已经消费的消息一直被引用不会GC，所以建议ringbuffer的size不要太大。
	 * 但是异步任务很可能是一个耗时的长任务，所以在此引进了二级缓存的概念
	 * 添加消息时，先ringbuffer（一级缓存）tryPublishEvent
	 * ，防止高并发下，判断为true后ringbuffer依然空间不足）. 如果空间不足丢进concurrentQueue（二级缓存） 消费
	 *
	 * @param o
	 */
	@Override
	public void addTask(O o) {
		if (started.get()) {
			if (ringBuffer.tryPublishEvent(new EventTranslatorOneArg<HandlerEvent<O>, O>() {

				@Override
				public void translateTo(HandlerEvent<O> event, long sequence, O arg0) {
					event.setData(arg0);
				}
			}, o)) {
				return;
			}
			int queueSize = concurrentQueueSize.get();
			if (limitQueueSize()) {
				queueSizeWaring(queueSize);
				concurrentQueue.add(new HandlerEvent<>(o));
				concurrentQueueSize.incrementAndGet();
				synchronized (lock) {
					lock.notifyAll();
				}
			} else {
				log.error("===>asyncWorkPoolName:{} over limit max message nums,cur queueSize:{}", asyncWorkPoolName, queueSize);
				rejectHandler.onEvent(o);
			}
		}
	}

	private void queueSizeWaring(int queueSize) {
		if (queueSize >= QUEUE_SIZE_WARING && queueSize % QUEUE_SIZE_WARING == 0) {
			log.error("===>asyncWorkPoolName:{} queue size warning,cur queueSize:{}", asyncWorkPoolName, queueSize);
		}
	}

	private boolean limitQueueSize() {
		return !concurrentQueueSize.compareAndSet(maxQueueSize, maxQueueSize);
	}

	/**
	 * 将ringbuffer所有任务执行完成后，关闭线程退出
	 */
	@Override
	public void shutdownGracefully() {
		while (ringBuffer.getCursor() > workSequence.get()) {
			Thread.yield();
		}
		for (Processor<?> processor : processors) {
			processor.halt();
		}
		started.set(false);
	}

	/**
	 * 线程完成当前循环后，关闭线程退出
	 */
	@Override
	public void shutdown() {
		for (Processor<HandlerEvent<O>> processor : processors) {
			processor.halt();
		}
		started.set(false);
	}

	@Override
	public boolean isRunning() {
		return started.get();
	}

	public Integer getBufferSize() {
		return bufferSize;
	}

	public Integer getWorkThreadNum() {
		return workThreadNum;
	}

	public WorkHandler<HandlerEvent<O>> getWorkHandler() {
		return workHandler;
	}

	public void setWorkHandler(WorkHandler<HandlerEvent<O>> workHandler) {
		this.workHandler = workHandler;
	}

}
